package com.xiaohu.sink;

import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.function.SerializableSupplier;

import javax.sql.XADataSource;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class JDBCExactlyOnceSinkDemo {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE); // 精确一次必须开启checkpoint

        // 假设这是我们的数据源
//        DataStream<String> text = env.fromElements(
//                "Hello, World!",
//                "Flink is awesome!",
//                "This is a test."
//        );

        DataStreamSource<String> text = env.socketTextStream("master", 7777);


        // 转换数据以匹配目标表的列
        DataStream<String[]> mappedData = text.map(new MapFunction<String, String[]>() {
            @Override
            public String[] map(String value) {
                return new String[]{null, value};  // null 表示自增ID
            }
        });

        // 调用exactlyOnceSink算子
        mappedData.addSink(JdbcSink.exactlyOnceSink(
                "INSERT INTO sink_jdbc (info) VALUES (?)",
                new JdbcStatementBuilder<String[]>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, String[] strings) throws SQLException {
                        preparedStatement.setString(1, strings[1]);
                    }
                },
                JdbcExecutionOptions.builder()
                        .withMaxRetries(0) // 重试0次 确保恰好一次语义
                        .build(),
                JdbcExactlyOnceOptions.builder()
                        .withTransactionPerConnection(true) // mysql和PostgreSQL只允许每个连接执行一个 XA 事务
                        .build(),
                new SerializableSupplier<XADataSource>() {
                    @Override
                    public XADataSource get() {
                        MysqlXADataSource xaDataSource = new MysqlXADataSource();
                        xaDataSource.setUrl("jdbc:mysql://master:3306/xiaohu_db?useUnicode=true&characterEncoding=utf-8&useSSL=false");
                        xaDataSource.setUser("root");
                        xaDataSource.setPassword("123456");
                        return xaDataSource;
                    }
                }
        ));

        // 执行任务
        env.execute("Write to MySQL Example");
    }
}
